package io.confluent.examples.streams


import org.apache.kafka.common.serialization._
import org.apache.kafka.streams.kstream.KStreamBuilder
import org.apache.kafka.streams.kstream.KStream
import java.util.Properties
import org.apache.kafka.streams.StreamsConfig
import org.apache.kafka.common.serialization.Serdes.ByteArraySerde
import org.apache.kafka.common.serialization.Serdes.StringSerde
import org.apache.kafka.common.serialization.Serdes
import org.apache.kafka.streams.KeyValue
import org.apache.kafka.streams.KafkaStreams

/**
  * Demonstrates how to perform simple, state-less transformations via map functions.
  * Same as [[MapFunctionLambdaExample]] but in Scala.
  *
  * Use cases include e.g. basic data sanitization, data anonymization by obfuscating sensitive data
  * fields (such as personally identifiable information aka PII).  This specific example reads
  * incoming text lines and converts each text line to all-uppercase.
  *
  * Requires a version of Scala that supports Java 8 and SAM / Java lambda (e.g. Scala 2.11 with
  * `-Xexperimental` compiler flag, or 2.12).
  *
  * HOW TO RUN THIS EXAMPLE
  *
  * 1) Start Zookeeper and Kafka.
  * Please refer to <a href='http://docs.confluent.io/current/quickstart.html#quickstart'>QuickStart</a>.
  *
  * 2) Create the input and output topics used by this example.
  *
  * {{{
  * $ bin/kafka-topics --create --topic TextLinesTopic --zookeeper localhost:2181 --partitions 1 --replication-factor 1
  * $ bin/kafka-topics --create --topic UppercasedTextLinesTopic --zookeeper localhost:2181 --partitions 1 --replication-factor 1
  * $ bin/kafka-topics --create --topic OriginalAndUppercasedTopic --zookeeper localhost:2181 --partitions 1 --replication-factor 1
  * }}}
  *
  * Note: The above commands are for the Confluent Platform. For Apache Kafka it should be
  * `bin/kafka-topics.sh ...`.
  *
  * 3) Start this example application either in your IDE or on the command line.
  *
  * If via the command line please refer to
  * <a href='https://github.com/confluentinc/kafka-streams-examples#packaging-and-running'>Packaging</a>.
  * Once packaged you can then run:
  *
  * {{{
  * $ java -cp target/kafka-streams-examples-4.0.0-SNAPSHOT-standalone.jar io.confluent.examples.streams.MapFunctionScalaExample
  * }}}
  *
  * 4) Write some input data to the source topics (e.g. via `kafka-console-producer`.  The already
  * running example application (step 3) will automatically process this input data and write the
  * results to the output topics.
  *
  * {{{
  * # Start the console producer.  You can then enter input data by writing some line of text,
  * # followed by ENTER:
  * #
  * #   hello kafka streams<ENTER>
  * #   all streams lead to kafka<ENTER>
  * #
  * # Every line you enter will become the value of a single Kafka message.
  * $ bin/kafka-console-producer --broker-list localhost:9092 --topic TextLinesTopic
  * }}}
  *
  * 5) Inspect the resulting data in the output topics, e.g. via `kafka-console-consumer`.
  *
  * {{{
  * $ bin/kafka-console-consumer --new-consumer --bootstrap-server localhost:9092 --topic UppercasedTextLinesTopic --from-beginning
  * $ bin/kafka-console-consumer --new-consumer --bootstrap-server localhost:9092 --topic OriginalAndUppercasedTopic --from-beginning
  * }}}
  *
  * You should see output data similar to:
  * {{{
  * HELLO KAFKA STREAMS
  * ALL STREAMS LEAD TO KAFKA
  * }}}
  *
  * 6) Once you're done with your experiments, you can stop this example via `Ctrl-C`.  If needed,
  * also stop the Kafka broker (`Ctrl-C`), and only then stop the ZooKeeper instance (`Ctrl-C`).
  */
object MapFunctionScalaExample {
  def main(args:Array[String]):Unit = {
   val bootstrapServers = if(args.length > 0 ) args(0) else "localhost:9092"
   val builder = new KStreamBuilder
   
   val streamingConfig = {
     val settings = new Properties
     settings.put(StreamsConfig.APPLICATION_ID_CONFIG,"map-function-scala-example")
     settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG,bootstrapServers)
     settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,classOf[ByteArraySerde])
     settings.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,classOf[StringSerde])
     settings
   }
   
   val stringSerde :Serde[String] = Serdes.String()
   
   //Read the input kafka topic into a kstream instance
   val textLines:KStream[Array[Byte],String] = builder.stream("TextLinesTopic")
   
   val uppercasedWithMapValues:KStream[Array[Byte],String] = textLines.mapValues(_.toUpperCase())
   
   uppercasedWithMapValues.to("UppercasedTextLinesTopic")
 
   val originalAndUppercased: KStream[String, String] = textLines.map((key, value) => KeyValue.pair(value,value.toUpperCase))
   
   
   originalAndUppercased.to(Serdes.serdeFrom(classOf[String]),Serdes.serdeFrom(classOf[String]),"OriginalAndUppercasedTopic")
   
   val streams:KafkaStreams = new KafkaStreams(builder,streamingConfig)
   streams.start()

  }
}